package redis_stream

import (
	"encoding/json"
	"errors"
	"fmt"
	"github.com/go-redis/redis"
	"sync"
	"time"
)

type streamC struct {
	group *ConsumerGroup
}

func (s streamC) Run(group *ConsumerGroup) error {
	s.group = group
	var wg sync.WaitGroup

	s.group.client.client.XGroupCreateMkStream(s.group.stream, defaultGroup, "0")
	for i := 0; i < s.group.option.ConsumerCount; i++ {
		wg.Add(1)

		go func(i int) {
			defer wg.Done()

			for {
				if err := s.bind(i, ">"); err != nil {
					break
				}
				if err := s.bind(i, "0"); err != nil {
					break
				}
				time.Sleep(time.Millisecond * 25)
			}
		}(i)
	}

	//每隔5分钟清理一次队列和调整队列大小
	go func() {
		ticker := time.NewTicker(time.Minute * 5)
		for range ticker.C {
			s.XClaim()

			s.group.client.client.XTrimApprox(s.group.stream, s.group.option.StreamMaxLen)
		}
	}()

	wg.Wait()

	return errors.New(fmt.Sprintf("listen redis stream [%s] error", s.group.stream))
}

func (s streamC) XClaim() {
	cmd := s.group.client.client.XPending(s.group.stream, defaultGroup)
	if cmd.Err() != nil {
		return
	}

	pending, err := cmd.Result()
	if err != nil {
		return
	}

	for consumer, count := range pending.Consumers {
		args := &redis.XPendingExtArgs{
			Stream:   s.group.stream,
			Group:    defaultGroup,
			Start:    "-",
			End:      "+",
			Consumer: consumer,
			Count:    count,
		}
		cmd := s.group.client.client.XPendingExt(args)
		if cmd.Err() != nil {
			continue
		}
		ext, err := cmd.Result()
		if err != nil {
			continue
		}

		for _, pendingExt := range ext {
			if pendingExt.Idle < defaultIdle {
				continue
			}
			args := &redis.XClaimArgs{
				Stream:   s.group.stream,
				Group:    defaultGroup,
				Consumer: s.group.nextConsumerId(0),
				MinIdle:  defaultIdle,
				Messages: []string{pendingExt.Id},
			}
			if cmd := s.group.client.client.XClaim(args); cmd.Err() != nil {
				continue
			}
		}
	}
}

func (s streamC) bind(consumerId int, start string) error {
	args := redis.XReadGroupArgs{
		Group:    defaultGroup,
		Consumer: s.group.nextConsumerId(consumerId),
		Count:    s.group.option.FetchCount,
		Streams:  []string{s.group.stream, start},
		NoAck:    false,
		Block:    time.Second * 10,
	}
	cmd := s.group.client.client.XReadGroup(&args)
	if cmd.Err() != nil {
		return nil
	}
	streams, err := cmd.Result()
	if err != nil {
		return err
	}

	for _, stream := range streams {
		for _, message := range stream.Messages {
			var payload = Payload{}

			if message.Values["payload"] != nil {
				if err = json.Unmarshal([]byte(message.Values["payload"].(string)), &payload); err != nil {
					continue
				}
				payload.ID = message.ID
				payload.Stream = stream.Stream
				if err = s.group.callback(&payload); err != nil {
					continue
				}
			}

			if err = s.group.client.client.XAck(s.group.stream, defaultGroup, payload.ID).Err(); err == nil {
				s.group.client.client.XDel(s.group.stream, payload.ID)
			}
		}
	}

	return nil
}
